Fluentdのコネクタを使ってBigQueryへのストリーミング挿入を試してみる
はじめに
データアナリティクス事業本部のkobayashiです。
BigQueryへのデータ取り込みの方法としてストリーミングがあります。その機能を試してみたいと思いGCPの公式ドキュメントにある「Fluentd と BigQuery を使用したリアルタイムのログ分析 」の内容に沿ってFluentdからのリアルタイムロギングを試してみたのでその内容をまとめます。
BigQueryのストリーミング挿入
BigQueryにデータを取り込む際には主にバルクロードとストリーミングがあります。 バルクロードはデータソースとしてCSV・ParquetなどのファイルをCloud Storageから読み込んでBigQueryに取り込んだり、BigQuery Data Transfer Service で他のサービスからデータを取り込む方法になります。一方ストリーミングはデータが発生するたびにBigQueryのストリーミングAPIを呼び出してBigQueryにデータを取り込む方法になります。
ストリーミングでデータを取り込むことによってほぼリアルタイムでBigQueryでデータを分析することができるようになります。
- データの読み込みの概要 | BigQuery | Google Cloud
- BigQuery へのデータのストリーミング | Google Cloud
- BigQuery Data Transfer Service の概要 | Google Cloud
今回はこのストリーミング取り込みを試してみるために「Fluentd と BigQuery を使用したリアルタイムのログ分析 | Cloud アーキテクチャ センター」 の内容を実施しますが、ドキュメントと違いGCP外にあるWebサーバーからのログ転送を試してみます。これは公式ドキュメントとはちょっと違うことをやってみたかったのと単に手持ちのサーバーでNginxのログを取れるサーバーが他のサービスにあったためです。
BigQueryのストリーミング挿入を試してみる
すでにNginxが動いているサーバーがある前提で進めるため、行う作業として以下になります。
- WebサーバーにFluentdのインストールとBigQueryコネクタのインストールを行う
- BigQueryにNginxのログ用のテーブルを作成する
- BigQueryコネクタ用のサービスアカウントを作りロールを割り当てる
- Fluentdの設定ファイルにBigQueryコネクタの設定する
環境
- centos 7.8.2003
- nginx 1.18.0
それではFluentd と BigQuery を使用したリアルタイムのログ分析を進めます。
WebサーバーにFluentdとBigQueryコネクタのインストールする
FluentdとFluentd-BigQueryコネクタのインストールは以下のコマンドで実行します。 ただコマンドを実行するだけなので特に詰まる箇所はないと思います。
$ sudo curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent4.sh | sh $ sudo systemctl start td-agent.service $ sudo systemctl enable td-agent.service $ sudo /usr/sbin/td-agent-gem install fluent-plugin-BigQuery
BigQueryにNginxのログ用のテーブルを作成する
BigQueryウェブUIでテーブルを作成します。はじめにデータセットを作成し、そのデータセット内にテーブルを作成します。
手順1) プロジェクトを選択しデータセットを作成
を押下し、データセットを作成する。
手順2) 作成したデータセットを選択しテーブルを作成
を押下し、テーブルを作成する。その際、スキーマでテキストとして編集
を選択し、以下のカラム定義を入力する。
[ { "type": "TIMESTAMP", "name": "time" }, { "type": "STRING", "name": "remote" }, { "type": "STRING", "name": "host" }, { "type": "STRING", "name": "user" }, { "type": "STRING", "name": "method" }, { "type": "STRING", "name": "path" }, { "type": "STRING", "name": "code" }, { "type": "INTEGER", "name": "size" }, { "type": "STRING", "name": "referer" }, { "type": "STRING", "name": "agent" }, { "type": "STRING", "name": "http_x_forwarded_for" } ]
以上でFluentdのログを保存するテーブルが作成できました。
BigQueryコネクタ用のサービスアカウントを作りロールを割り当てる
次にWebサーバーのBigQueryコネクタで使用するサービスアカウントを作成し、必要なロールを割り当てます。
手順1) GCPのウェブUIからIAMと管理 > サービスアカウント
の画面でサービスアカウントを作成
を押下しサービスを作成する。
- サービスアカウント名 : 適当な名前の入力
- 「サービスアカウントにプロジェクトへのアクセスを許可する」「ユーザーにこのサービスアカウントへのアクセスを許可」は何も入力せずに省略する
手順2)
作成したサービスアカントを選択し、キー
のタブでサービスアカウントの鍵を作成する。
これでサービスアカウントの作成は完了したので次にBigQueryのテーブルへのアクセス権を設定します。
手順3) BigQueryウェブUIへ移動し先程作成したテーブルを選択しテーブルを共有
を押下する。
手順4) テーブルの権限ダイアログで、メンバーを追加で先程作成したサービスアカウントを選択し、権限にてBigQueryデータ編集者
を選択して追加を押下する。
これでテーブルの権限にBigQueryデータ編集者
としてサービスアカウントが追加されました。
Fluentdの設定ファイルにBigQueryコネクタの設定する
最後にWebサーバーのFluentdの設定ファイルにBigQueryコネクタの設定を行います。
手順1) Webサーバーへサーバーアカウントの鍵を送る。
$ scp {project_name}-1c3a5abc2e41.json {Webサーバー}:
手順2) Fluentdの設定ファイルに以下を追記する。
- 公式のドキュメントと異なり認証方式をサービスアカウントの鍵方式へ変更しています。
$ vi /etc/td-agent/td-agent.conf <source> @type tail @id input_tail <parse> @type nginx </parse> path /var/log/nginx/www.example.com/access.log # nginxのログを指定 pos_file /var/log/td-agent/httpd-access.log.pos tag nginx.access </source> <match nginx.access> @type BigQuery_insert # Authenticate with BigQuery using the VM's service account. auth_method json_key json_key /home/centos/{project_name}-1c3a5abc2e41.json # サービスアカウントの鍵を指定 project {project_name} dataset fluentd # BigQueryのデータセット名 table nginx_access # BigQueryのテーブル名 fetch_schema true <inject> # Convert fluentd timestamp into TIMESTAMP string time_key time time_type string time_format %Y-%m-%dT%H:%M:%S.%NZ </inject> </match>
手順3) Fluentdを再起動する。
$ sudo systemctl restart td-agent.service
その際にFluentdのログを確認すると設定に問題がなければ以下のようなログが出ますが、
$ sudo tail -f /var/log/td-agent/td-agent.log 2021-04-20 05:28:34 +0900 [info]: #0 shutting down input plugin type=:debug_agent plugin_id="input_debug_agent" 2021-04-20 05:28:34 +0900 [info]: #0 shutting down output plugin type=:tdlog plugin_id="output_td" 2021-04-20 05:28:34 +0900 [info]: #0 shutting down output plugin type=:BigQuery_insert plugin_id="object:7a8" 2021-04-20 05:28:34 +0900 [info]: #0 shutting down output plugin type=:stdout plugin_id="output_stdout" 2021-04-20 05:28:34 +0900 [info]: Worker 0 finished with status 0
サービスアカウントの権限周りでミスが有ると以下のようなログが出ます。最初、テーブルへの権限不足で出してしまいました。
$ sudo tail -f /var/log/td-agent/td-agent.log 2021-04-20 04:54:25 +0900 [info]: #0 fluentd worker is now running worker=0 2021-04-20 04:58:39 +0900 [error]: #0 tables.get API project_id="{project_name}" dataset="fluentd" table="nginx_access" code=403 message="accessDenied: Access Denied: Table {project_name}:fluentd.nginx_access: Permission BigQuery.tables.get denied on table {project_name}:fluentd.nginx_access (or it may not exist)." 2021-04-20 04:58:39 +0900 [warn]: #0 emit transaction failed: error_class=RuntimeError error="failed to fetch schema from BigQuery" location="/opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-BigQuery-2.2.0/lib/fluent/plugin/out_BigQuery_base.rb:196:in `fetch_schema'" tag="nginx.access"
以上で設定は終わりです。
Webサーバーにアクセスがあった後に BigQueryウェブUIを確認するとログがBigQueryへストリーミング挿入されていることが確認できます。
まとめ
BigQueryへのストリーミングデータの取り込みをFluentdのBigQueryコネクタを使って試してみました。少しの設定でリアルタイムのログをBigQueryへ取り込めました。
今回は単純にWebサーバーからBigQueryへ直接ストリーミングを行いましたが、BigQuery以外のGCPのサービスでFluentdのデータを使いたい場合はPub/Subを経由する方法もありますのでそちらの方も試してみたいと思います。
最後まで読んで頂いてありがとうございました。